package com.stars.easyms.schedule.core;

import com.stars.easyms.schedule.bean.DbScheduleServer;
import com.stars.easyms.schedule.exception.DistributedScheduleRuntimeException;
import com.stars.easyms.schedule.util.DistributedThreadPoolExecutor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 分布式调度任务执行器线程池
 *
 * @author guoguifang
 */
class DistributedTaskExecutorManager {

    /**
     * 任务执行者列表
     */
    private List<DistributedTaskExecutor> distributedTaskExecutorList = Collections.synchronizedList(new ArrayList<>());

    /**
     * 分布式线程池
     */
    private DistributedThreadPoolExecutor distributedThreadPoolExecutor;

    /**
     * 任务执行者自增编号
     */
    private AtomicInteger executorId = new AtomicInteger(1);

    /**
     * 启动批量任务执行线程池
     */
    synchronized void start() {
        if (distributedThreadPoolExecutor == null) {
            DbScheduleServer config = DistributedScheduleManager.getSingleInstance().getDbScheduleServerConfig();
            distributedThreadPoolExecutor = new DistributedThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(),
                    config.getKeepAliveTimeMilli(), TimeUnit.MILLISECONDS);
        }
    }

    /**
     * 获取当前可执行任务的线程数量，如果结果>=0时不可修改核心线程数量及最大线程数量所以需对修改核心线程数量及最大线程数量加锁
     */
    synchronized int getExecutableCountAndTryLockPoolSize() {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't execute task!");
        }
        return distributedThreadPoolExecutor.getExecutableCountAndTryLockPoolSize();
    }

    /**
     * 对修改核心线程数量及最大线程数量解锁
     */
    synchronized void unlockPoolSize() {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't execute task!");
        }
        distributedThreadPoolExecutor.unlockPoolSize();
    }

    synchronized boolean execute(Runnable task) {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't execute task!");
        }
        return distributedThreadPoolExecutor.execute(task);
    }

    /**
     * 得到一个任务执行者，如果所有的任务执行者都有任务处理则新建一个
     */
    DistributedTaskExecutor getOneDistributedTaskExecutor() {
        for (DistributedTaskExecutor distributedTaskExecutor : distributedTaskExecutorList) {
            if (distributedTaskExecutor.getExecutableSubTask() == null) {
                return distributedTaskExecutor;
            }
        }
        DistributedTaskExecutor distributedTaskExecutor = newDistributedTaskExecutor();
        distributedTaskExecutorList.add(distributedTaskExecutor);
        return distributedTaskExecutor;
    }

    /**
     * 设置分布式线程池核心线程数量
     */
    void setCorePoolSize(int corePoolSize) {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't set corePoolSize!");
        }
        distributedThreadPoolExecutor.setCorePoolSize(corePoolSize);
    }

    /**
     * 设置分布式线程池最大线程数量
     */
    void setMaxPoolSize(int maxPoolSize) {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't set maxPoolSize!");
        }
        distributedThreadPoolExecutor.setMaxPoolSize(maxPoolSize);
    }

    /**
     * 同时修改分布式线程池核心线程数量与最大线程数量
     */
    void setPoolSize(int corePoolSize, int maxPoolSize) {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't set corePoolSize and maxPoolSize!");
        }
        distributedThreadPoolExecutor.setPoolSize(corePoolSize, maxPoolSize);
    }

    /**
     * 设置非核心线程最大空闲时间
     */
    void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
        if (distributedThreadPoolExecutor == null) {
            throw new DistributedScheduleRuntimeException("The distributed scheduling framework thread pool does not start successfully, can't set keepAliveTime!");
        }
        distributedThreadPoolExecutor.setKeepAliveTime(keepAliveTime, timeUnit);
    }

    public List<DistributedTaskExecutor> getDistributedTaskExecutorList() {
        return this.distributedTaskExecutorList;
    }

    /**
     * 描述：生成一个新的任务执行线程
     */
    private DistributedTaskExecutor newDistributedTaskExecutor() {
        return new DistributedTaskExecutor(executorId.getAndIncrement());
    }

    private static DistributedTaskExecutorManager singleInstance;

    static DistributedTaskExecutorManager getSingleInstance() {
        if (singleInstance == null) {
            synchronized (DistributedTaskExecutorManager.class) {
                if (singleInstance == null) {
                    singleInstance = new DistributedTaskExecutorManager();
                }
            }
        }
        return singleInstance;
    }

    private DistributedTaskExecutorManager() {
    }
}
